#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""Teams sub-commands."""

from __future__ import annotations

from sqlalchemy import func, select
from sqlalchemy.exc import IntegrityError

from airflow.cli.simple_table import AirflowConsole
from airflow.models.connection import Connection
from airflow.models.pool import Pool
from airflow.models.team import Team, dag_bundle_team_association_table
from airflow.models.variable import Variable
from airflow.utils import cli as cli_utils
from airflow.utils.providers_configuration_loader import providers_configuration_loaded
from airflow.utils.session import NEW_SESSION, provide_session

NO_TEAMS_LIST_MSG = "No teams found."


def _show_teams(teams, output):
    """Display teams in the specified output format."""
    AirflowConsole().print_as(
        data=teams,
        output=output,
        mapper=lambda x: {
            "name": x.name,
        },
    )


def _extract_team_name(args):
    """Extract and validate team name from args."""
    team_name = args.name.strip()
    if not team_name:
        raise SystemExit("Team name cannot be empty")
    return team_name


@cli_utils.action_cli
@providers_configuration_loaded
@provide_session
def team_create(args, session=NEW_SESSION):
    """Create a new team."""
    team_name = _extract_team_name(args)

    # Check if team with this name already exists
    if session.scalar(select(Team).where(Team.name == team_name)):
        raise SystemExit(f"Team with name '{team_name}' already exists")

    # Create new team (UUID will be auto-generated by the database)
    new_team = Team(name=team_name)

    try:
        session.add(new_team)
        session.commit()
        print(f"Team '{team_name}' created successfully.")
    except IntegrityError as e:
        session.rollback()
        raise SystemExit(f"Failed to create team '{team_name}': {e}")


@cli_utils.action_cli
@providers_configuration_loaded
@provide_session
def team_delete(args, session=NEW_SESSION):
    """Delete a team after checking for associations."""
    team_name = _extract_team_name(args)

    # Find the team
    team = session.scalar(select(Team).where(Team.name == team_name))
    if not team:
        raise SystemExit(f"Team '{team_name}' does not exist")

    # Check for associations
    associations = []

    # Check DAG bundle associations
    dag_bundle_count = session.scalar(
        select(func.count())
        .select_from(dag_bundle_team_association_table)
        .where(dag_bundle_team_association_table.c.team_name == team.name)
    )
    if dag_bundle_count:
        associations.append(f"{dag_bundle_count} DAG bundle(s)")

    # Check connection associations
    if connection_count := session.scalar(
        select(func.count(Connection.id)).where(Connection.team_name == team.name)
    ):
        associations.append(f"{connection_count} connection(s)")

    # Check variable associations
    if variable_count := session.scalar(
        select(func.count(Variable.id)).where(Variable.team_name == team.name)
    ):
        associations.append(f"{variable_count} variable(s)")

    # Check pool associations
    if pool_count := session.scalar(select(func.count(Pool.id)).where(Pool.team_name == team.name)):
        associations.append(f"{pool_count} pool(s)")

    # If there are associations, prevent deletion
    if associations:
        association_list = ", ".join(associations)
        raise SystemExit(
            f"Cannot delete team '{team_name}' because it is associated with: {association_list}. "
            f"Please remove these associations first."
        )

    # Confirm deletion if not using --yes flag
    if not args.yes:
        confirmation = input(f"Are you sure you want to delete team '{team_name}'? (y/N): ")
        if confirmation.upper() != "Y":
            print("Team deletion cancelled")
            return

    # Delete the team
    try:
        session.delete(team)
        session.commit()
        print(f"Team '{team_name}' deleted successfully")
    except Exception as e:
        session.rollback()
        raise SystemExit(f"Failed to delete team '{team_name}': {e}")


@cli_utils.action_cli
@providers_configuration_loaded
@provide_session
def team_list(args, session=NEW_SESSION):
    """List all teams."""
    teams = session.scalars(select(Team).order_by(Team.name)).all()
    if not teams:
        print(NO_TEAMS_LIST_MSG)
    else:
        _show_teams(teams=teams, output=args.output)
